/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package studio.raptor.ddal.config.parser;

import com.google.common.base.Strings;
import java.io.StringReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import studio.raptor.ddal.common.exception.GenericException;
import studio.raptor.ddal.common.util.SplitUtil;
import studio.raptor.ddal.config.config.ShardConfig;
import studio.raptor.ddal.config.exception.ConfigErrCodes;
import studio.raptor.ddal.config.model.rule.ShardRule;
import studio.raptor.ddal.config.model.rule.ShardRules;
import studio.raptor.ddal.config.model.shard.DataSource;
import studio.raptor.ddal.config.model.shard.DataSourceGroup;
import studio.raptor.ddal.config.model.shard.DataSourceGroups;
import studio.raptor.ddal.config.model.shard.PhysicalDBCluster;
import studio.raptor.ddal.config.model.shard.PhysicalDBClusters;
import studio.raptor.ddal.config.model.shard.Shard;
import studio.raptor.ddal.config.model.shard.ShardGroups;
import studio.raptor.ddal.config.model.shard.Shards;
import studio.raptor.ddal.config.model.shard.Table;
import studio.raptor.ddal.config.model.shard.Tables;
import studio.raptor.ddal.config.model.shard.VirtualDb;
import studio.raptor.ddal.config.model.shard.VirtualDbs;

public class ShardConfigParser {

  private static final XPath XPATH = XPathFactory.newInstance().newXPath();

  public static void parse(ShardConfig shardConfig, String xml) throws GenericException {
    try {
      DocumentBuilderFactory domFactory = DocumentBuilderFactory.newInstance();
      DocumentBuilder builder = domFactory.newDocumentBuilder();
      InputSource is = new InputSource(new StringReader(xml));
      Document document = builder.parse(is);
      Element root = document.getDocumentElement();

      createShardGroups(shardConfig, root);
      createVirtualdbs(shardConfig, root);
      createPhysicalDBClusters(shardConfig, root);
      createDataSources(shardConfig, root);
    } catch (Exception e) {
      throw new GenericException(ConfigErrCodes.CONFIG_103, e.getMessage());
    }
  }

  private static void createShardGroups(ShardConfig shardConfig, Element root)
      throws XPathExpressionException {
    NodeList shardGroupsDom = (NodeList) XPATH
        .evaluate("shardGroups/shardGroup", root, XPathConstants.NODESET);

    ShardGroups shardGroups = new ShardGroups();
    for (int i = 0; i < shardGroupsDom.getLength(); i++) {
      Node shardGroupDom = shardGroupsDom.item(i);

      ShardGroups.ShardGroup shardGroup = new ShardGroups.ShardGroup();
      shardGroup.setName(XPATH.evaluate("@name", shardGroupDom));
      createShardZones(shardGroup, shardGroupDom);
      shardGroups.addGroup(shardGroup);
    }

    shardConfig.setShardGroups(shardGroups);
  }

  private static void createShardZones(ShardGroups.ShardGroup shardGroup, Node shardGroupDom)
      throws XPathExpressionException {
    NodeList shardZonesDom = (NodeList) XPATH
        .evaluate("shardZone", shardGroupDom, XPathConstants.NODESET);

    Shards shards = new Shards();
    String[] shardZones = new String[shardZonesDom.getLength()];

    if (shardZonesDom.getLength() > 0) {
      for (int i = 0; i < shardZonesDom.getLength(); i++) {
        Node shardZoneDom = shardZonesDom.item(i);

        String shardZone = XPATH.evaluate("@name", shardZoneDom);
        shardZones[i] = shardZone;

        createShard(shards, shardZoneDom, shardZone);
      }

    } else {
      createShard(shards, shardGroupDom, null);
    }

    shardGroup.setShardZones(shardZones);
    shardGroup.setShards(shards);
  }

  private static void createShard(Shards shards, Node parentDom, String shardZone)
      throws XPathExpressionException {
    NodeList shardsDom = (NodeList) XPATH.evaluate("shard", parentDom, XPathConstants.NODESET);
    for (int i = 0; i < shardsDom.getLength(); i++) {
      Node shardDom = shardsDom.item(i);

      Shard shard = new Shard();
      shard.setName(XPATH.evaluate("@name", shardDom));
      shard.setDsGroup(XPATH.evaluate("@dsGroup", shardDom));
      shard.setSchema(XPATH.evaluate("@schema", shardDom));
      if (null != shardZone) {
        shard.setShardZone(shardZone);
      }

      shards.add(shard);
    }
  }

  private static void createDataSources(ShardConfig shardConfig, Element root)
      throws XPathExpressionException {
    NodeList dataSourcesDom = (NodeList) XPATH
        .evaluate("dataSources/group", root, XPathConstants.NODESET);
    DataSourceGroups dataSources = new DataSourceGroups();
    for (int i = 0; i < dataSourcesDom.getLength(); i++) {
      Node groupDom = dataSourcesDom.item(i);

      DataSourceGroup group = new DataSourceGroup();
      group.setName(XPATH.evaluate("@name", groupDom));
      group.setRelaCluster(XPATH.evaluate("@relaCluster", groupDom));

      String balance = XPATH.evaluate("@balance", groupDom);
      if (!Strings.isNullOrEmpty(balance)) {
        group.setBalance(balance);
      }

      DataSource[] dsArr = createDataSourceArr(groupDom);
      group.setDataSources(dsArr);
      dataSources.add(group);
    }
    shardConfig.setDataSourceGroups(dataSources);
  }

  private static DataSource[] createDataSourceArr(Node groupDom) throws XPathExpressionException {
    NodeList dataSourcesDom = (NodeList) XPATH
        .evaluate("dataSource", groupDom, XPathConstants.NODESET);
    DataSource[] dsArr = new DataSource[dataSourcesDom.getLength()];

    for (int i = 0; i < dataSourcesDom.getLength(); i++) {
      Node dataSourceDom = dataSourcesDom.item(i);
      DataSource ds = new DataSource();

      ds.setUser(XPATH.evaluate("@user", dataSourceDom));
      ds.setPwd(XPATH.evaluate("@pwd", dataSourceDom));
      ds.setDbInstName(XPATH.evaluate("@dbInstName", dataSourceDom));
      ds.setAccessLevel(XPATH.evaluate("@accessLevel", dataSourceDom));
      ds.setDbDriver(XPATH.evaluate("@dbDriver", dataSourceDom));
      Map<String, String> params = createParams(dataSourceDom);
      ds.setParams(params);
      dsArr[i] = ds;
    }
    return dsArr;
  }

  private static Map<String, String> createParams(Node dataSourceDom)
      throws XPathExpressionException {
    NodeList propertiesDom = (NodeList) XPATH
        .evaluate("params/property", dataSourceDom, XPathConstants.NODESET);

    Map<String, String> map = new HashMap<>();
    for (int i = 0; i < propertiesDom.getLength(); i++) {
      Node propertyDom = propertiesDom.item(i);

      map.put(XPATH.evaluate("@name", propertyDom), XPATH.evaluate("@value", propertyDom));
    }
    return map;
  }

  private static void createPhysicalDBClusters(ShardConfig shardConfig, Element root)
      throws XPathExpressionException {
    NodeList physicalDBClustersDom = (NodeList) XPATH
        .evaluate("physicalDBClusters/physicalDBCluster", root, XPathConstants.NODESET);

    PhysicalDBClusters physicalDBClusters = new PhysicalDBClusters();
    for (int i = 0; i < physicalDBClustersDom.getLength(); i++) {
      Node physicalDBClusterDom = physicalDBClustersDom.item(i);

      PhysicalDBCluster physicalDBCluster = new PhysicalDBCluster();
      physicalDBCluster.setName(XPATH.evaluate("@name", physicalDBClusterDom));
      physicalDBCluster.setType(XPATH.evaluate("@type", physicalDBClusterDom));

      NodeList dbInstancesDom = (NodeList) XPATH
          .evaluate("dbInstance", physicalDBClusterDom, XPathConstants.NODESET);
      for (int j = 0; j < dbInstancesDom.getLength(); j++) {
        Node dbInstanceDom = dbInstancesDom.item(j);

        PhysicalDBCluster.DBInstance db = new PhysicalDBCluster.DBInstance();
        db.setHostname(XPATH.evaluate("@hostname", dbInstanceDom));
        db.setName(XPATH.evaluate("@name", dbInstanceDom));
        db.setPort(Integer.parseInt(XPATH.evaluate("@port", dbInstanceDom)));
        db.setRw(XPATH.evaluate("@rw", dbInstanceDom));
        db.setStatus(XPATH.evaluate("@status", dbInstanceDom));
        db.setH2db(XPATH.evaluate("@h2db", dbInstanceDom));
        db.setH2dir(XPATH.evaluate("@h2dir", dbInstanceDom));

        String sid = XPATH.evaluate("@sid", dbInstanceDom);
        if (!Strings.isNullOrEmpty(sid)) {
          db.setSid(sid);
        }

        String role = XPATH.evaluate("@role", dbInstanceDom);
        if (!Strings.isNullOrEmpty(role)) {
          db.setRole(role);
        }

        physicalDBCluster.add(db);
      }
      physicalDBClusters.add(physicalDBCluster);
    }
    shardConfig.setPhysicalDBClusters(physicalDBClusters);
  }

  private static void createVirtualdbs(ShardConfig shardConfig, Element root)
      throws XPathExpressionException, IllegalAccessException, InstantiationException, ClassNotFoundException {
    NodeList virtualDBsDom = (NodeList) XPATH
        .evaluate("virtualDBs/virtualDB", root, XPathConstants.NODESET);

    VirtualDbs virtualDbs = new VirtualDbs();
    for (int i = 0; i < virtualDBsDom.getLength(); i++) {
      Node virtualDBDom = virtualDBsDom.item(i);

      VirtualDb virtualDb = new VirtualDb();
      virtualDb.setName(XPATH.evaluate("@name", virtualDBDom));
      virtualDb.setRmOwner("true".equalsIgnoreCase(XPATH.evaluate("@rmOwner", virtualDBDom)));
      virtualDb.setSqlMaxLimit(Integer.parseInt(XPATH.evaluate("@sqlMaxLimit", virtualDBDom)));
      virtualDb.setShardGroup(XPATH.evaluate("@shardGroup", virtualDBDom));

      ShardGroups.ShardGroup shardGroup = shardConfig.getShardGroups()
          .get(virtualDb.getShardGroup());
      virtualDb.setShards(shardGroup.getShards());

      Tables tables = createTables(virtualDb, virtualDBDom, shardConfig);
      virtualDb.setTables(tables);

      VirtualDb.Seqs seqs = createSeqs(virtualDBDom);
      virtualDb.setSeqs(seqs);

      virtualDbs.add(virtualDb);
    }
    shardConfig.setVirtualDbs(virtualDbs);
  }

  private static VirtualDb.Seqs createSeqs(Node virtualDBDom) throws XPathExpressionException {
    NodeList seqsDom = (NodeList) XPATH.evaluate("seq", virtualDBDom, XPathConstants.NODESET);

    VirtualDb.Seqs seqs = new VirtualDb.Seqs();
    for (int i = 0; i < seqsDom.getLength(); i++) {
      Node seqDom = seqsDom.item(i);

      VirtualDb.Seq seq = new VirtualDb.Seq();
      seq.setName(XPATH.evaluate("@name", seqDom));
      seq.setType(XPATH.evaluate("@type", seqDom));

      String incr = XPATH.evaluate("@incr", seqDom);
      if (!Strings.isNullOrEmpty(incr)) {
        seq.setIncr(incr);
      }

      String start = XPATH.evaluate("@start", seqDom);
      if (!Strings.isNullOrEmpty(start)) {
        seq.setStart(start);
      }

      String cache = XPATH.evaluate("@cache", seqDom);
      if (!Strings.isNullOrEmpty(cache)) {
        seq.setCache(cache);
      }
      seqs.add(seq);
    }
    return seqs;
  }

  private static Tables createTables(VirtualDb virtualDb, Node virtualDBDom,
      ShardConfig shardConfig)
      throws XPathExpressionException, ClassNotFoundException, IllegalAccessException, InstantiationException {
    NodeList tablesDom = (NodeList) XPATH.evaluate("table", virtualDBDom, XPathConstants.NODESET);

    Tables tables = new Tables();
    for (int i = 0; i < tablesDom.getLength(); i++) {
      Node tableDom = tablesDom.item(i);

      Table parentTable = createTableBaseParam(tableDom, virtualDb.getShards().allShardNames());
      //分片表解析
      if (!parentTable.isGlobal()) {
        //DatabaseRule
        String databaseRule = XPATH.evaluate("@databaseRule", tableDom);
        if (!Strings.isNullOrEmpty(databaseRule)) {
          ShardRule dbShardRule = shardConfig.getRuleConfig().getShardRules().get(databaseRule);

          parentTable.setDatabaseRule(dbShardRule);
          parentTable.addShardColumns(dbShardRule.getShardColumns());
        }

        //TableRule
        addTableRule(parentTable, tableDom, shardConfig.getRuleConfig().getShardRules());

        if (null == parentTable.getDatabaseRule() && null == parentTable.getTableRule()) {
          throw new XPathExpressionException(
              "Database rule and Table rule can not both be null when actualTables exist, table name = "
                  + parentTable.getName());
        }

        //SubTable
        //subTable处理
        NodeList subTablesDom = (NodeList) XPATH
            .evaluate("subTable", tableDom, XPathConstants.NODESET);
        for (int j = 0; j < subTablesDom.getLength(); j++) {
          Node subTableDom = subTablesDom.item(j);

          Table subTable = createTableBaseParam(subTableDom, parentTable.getDatabaseShards());
          subTable.setParentTable(parentTable);
          subTable.setSubTable(true);
          subTable.setDatabaseShards(parentTable.getDatabaseShards());
          subTable.setDatabaseRule(parentTable.getDatabaseRule());
          subTable.addShardColumns(parentTable.getDatabaseRule().getShardColumns());
          addTableRule(subTable, subTableDom, shardConfig.getRuleConfig().getShardRules());
          tables.add(subTable);
        }
      }

      tables.add(parentTable);
    }
    return tables;
  }

  private static Table createTableBaseParam(Node tableDom, List<String> defaultShards)
      throws XPathExpressionException {
    Table table = new Table();
    table.setName(XPATH.evaluate("@name", tableDom).toUpperCase());
    String shardOnTable = XPATH.evaluate("@shards", tableDom);
    if (!Strings.isNullOrEmpty(shardOnTable)) {
      String[] shardNames = SplitUtil.split(shardOnTable, ',', '$', '-', '[', ']');
      Arrays.sort(shardNames);// 排序以解决同一分片集中的分片配置顺序不同导致的路由错乱
      table.setDatabaseShards(Arrays.asList(shardNames));
    } else {
      table.setDatabaseShards(defaultShards);
    }
    String type = XPATH.evaluate("@type", tableDom);
    if (!Strings.isNullOrEmpty(type)) {
      table.setGlobal(true);
    }

    return table;
  }

  private static void addTableRule(Table table, Node tableDom, ShardRules shardRules)
      throws XPathExpressionException {
    String actualTablesString = XPATH.evaluate("@actualTables", tableDom);
    if (!Strings.isNullOrEmpty(actualTablesString)) {
      //actualTables
      String[] actualTables = SplitUtil.split(actualTablesString, ',', '$', '-', '[', ']');
      Arrays.sort(actualTables);// 排序以解决同一分片集中的分片配置顺序不同导致的路由错乱
      table.setActualTables(Arrays.asList(actualTables));

      //tableRule
      String tableRuleName = XPATH.evaluate("@tableRule", tableDom);
      if (Strings.isNullOrEmpty(tableRuleName)) {
        throw new XPathExpressionException(
            "Table rule can not be null when actualTables exist, table name = " + table.getName());
      }

      ShardRule tableRule = shardRules.get(tableRuleName);
      table.setTableRule(tableRule);
      table.addShardColumns(tableRule.getShardColumns());
    }
  }

}
